TimeSeriesForestRegressor (TSF): interval features + tree ensemble#
The Time-Series Forest regressor is an interval-based ensemble:
Each tree samples random intervals from the input series.
On each interval it computes simple summary features (commonly mean, std, slope).
The tree learns a mapping from these features to the target.
The forest prediction is the average across trees.
This notebook implements a practical, sklearn-style TimeSeriesForestRegressor(min_interval=..., ...).
If you want a version where you provide your own feature function list, see ../composable_time_series_forest_regressor/overview.ipynb.
Core intuition (with math)#
Let \(x \in \mathbb{R}^m\) be one univariate series of length \(m\) (e.g., a sliding window of past values).
For one tree \(b\):
Sample \(K\) random intervals \(I_k = [s_k, e_k)\) with \(0 \le s_k < e_k \le m\).
For each interval, compute features (examples):
For the slope, fit a least-squares line \(x_t \approx a + bt\) over the interval indices \(t=0,\dots,|I|-1\):
Concatenate all features into a vector \(\phi_b(x) \in \mathbb{R}^{3K}\).
Fit a decision tree regressor \(f_b\) on \((\phi_b(x_i), y_i)\).
The forest predicts:
min_interval controls the shortest allowed interval length.
Small
min_intervalcaptures spikes/local shape, but features can be noisy.Large
min_intervalemphasizes slower patterns (level/trend fragments), but can miss sharp events.
import numpy as np
import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import os
import plotly.io as pio
from sklearn.tree import DecisionTreeRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
pio.renderers.default = os.environ.get("PLOTLY_RENDERER", "notebook")
pio.templates.default = "plotly_white"
rng = np.random.default_rng(7)
import numpy, pandas, sklearn, plotly
print("numpy:", numpy.__version__)
print("pandas:", pandas.__version__)
print("sklearn:", sklearn.__version__)
print("plotly:", plotly.__version__)
numpy: 1.26.2
pandas: 2.1.3
sklearn: 1.6.0
plotly: 6.5.2
def _as_3d_panel(X: np.ndarray) -> np.ndarray:
"""Accept (n, m) or (n, d, m). Return (n, d, m)."""
X = np.asarray(X, dtype=float)
if X.ndim == 2:
return X[:, None, :]
if X.ndim == 3:
return X
raise ValueError(f"X must be 2D or 3D, got shape={X.shape}")
def _random_intervals(
n_timepoints: int,
n_intervals: int,
*,
min_length: int,
max_length: int | None,
rng: np.random.Generator,
) -> list[tuple[int, int]]:
n_timepoints = int(n_timepoints)
min_length = max(2, int(min_length))
if max_length is None:
max_length = n_timepoints
max_length = int(min(max_length, n_timepoints))
if min_length > max_length:
raise ValueError("min_length cannot exceed max_length")
intervals: list[tuple[int, int]] = []
for _ in range(int(n_intervals)):
length = int(rng.integers(min_length, max_length + 1))
start = int(rng.integers(0, n_timepoints - length + 1))
end = start + length
intervals.append((start, end))
return intervals
def _f_mean(seg2d: np.ndarray) -> np.ndarray:
return np.mean(seg2d, axis=1)
def _f_std(seg2d: np.ndarray) -> np.ndarray:
return np.std(seg2d, axis=1, ddof=0)
def _f_slope(seg2d: np.ndarray) -> np.ndarray:
"""Least-squares slope vs time index within the interval."""
seg2d = np.asarray(seg2d, dtype=float)
n, L = seg2d.shape
t = np.arange(L, dtype=float)
t_mean = float(t.mean())
denom = float(np.sum((t - t_mean) ** 2))
if denom == 0.0:
return np.zeros(n, dtype=float)
x_mean = np.mean(seg2d, axis=1)
num = np.sum((t[None, :] - t_mean) * (seg2d - x_mean[:, None]), axis=1)
return num / denom
def _transform_intervals(
X3: np.ndarray,
intervals: list[tuple[int, int]],
) -> np.ndarray:
"""Turn panel (n, d, m) into tabular features for the given intervals."""
X3 = _as_3d_panel(X3)
n, d, m = X3.shape
feats = []
for dim in range(d):
x = X3[:, dim, :]
for (start, end) in intervals:
seg = x[:, start:end]
feats.append(_f_mean(seg))
feats.append(_f_std(seg))
feats.append(_f_slope(seg))
return np.column_stack(feats)
class TimeSeriesForestRegressor:
"""Time-Series Forest (TSF) regressor (interval features + tree ensemble).
This is a compact educational implementation with a practical API.
Parameters
----------
n_estimators : int
Number of trees.
n_intervals : int | "sqrt"
Intervals sampled per tree. "sqrt" uses int(sqrt(m)).
min_interval : int
Minimum interval length.
max_interval : int | None
Maximum interval length (default: m).
max_depth, min_samples_leaf : DecisionTreeRegressor params
Passed through to each tree.
random_state : int | None
Reproducible interval sampling + trees.
"""
def __init__(
self,
*,
n_estimators: int = 200,
n_intervals: int | str = "sqrt",
min_interval: int = 3,
max_interval: int | None = None,
max_depth: int | None = None,
min_samples_leaf: int = 1,
random_state: int | None = None,
) -> None:
self.n_estimators = int(n_estimators)
self.n_intervals = n_intervals
self.min_interval = int(min_interval)
self.max_interval = max_interval
self.max_depth = max_depth
self.min_samples_leaf = int(min_samples_leaf)
self.random_state = random_state
self.estimators_: list[DecisionTreeRegressor] | None = None
self.intervals_: list[list[tuple[int, int]]] | None = None
self.n_timepoints_: int | None = None
self.n_dims_: int | None = None
def _resolve_n_intervals(self, m: int) -> int:
if isinstance(self.n_intervals, str):
if self.n_intervals != "sqrt":
raise ValueError("n_intervals must be int or 'sqrt'")
return max(1, int(np.sqrt(m)))
return int(self.n_intervals)
def fit(self, X: np.ndarray, y: np.ndarray) -> "TimeSeriesForestRegressor":
X3 = _as_3d_panel(X)
y = np.asarray(y, dtype=float)
if y.ndim != 1:
raise ValueError("y must be 1D")
if X3.shape[0] != y.shape[0]:
raise ValueError("X and y must have the same number of samples")
n, d, m = X3.shape
self.n_timepoints_ = int(m)
self.n_dims_ = int(d)
n_intervals = self._resolve_n_intervals(m)
rng = np.random.default_rng(self.random_state)
self.estimators_ = []
self.intervals_ = []
for _ in range(self.n_estimators):
seed = int(rng.integers(0, 2**32 - 1))
tree_rng = np.random.default_rng(seed)
intervals = _random_intervals(
n_timepoints=m,
n_intervals=n_intervals,
min_length=self.min_interval,
max_length=self.max_interval,
rng=tree_rng,
)
Phi = _transform_intervals(X3, intervals)
tree = DecisionTreeRegressor(
random_state=seed,
max_depth=self.max_depth,
min_samples_leaf=self.min_samples_leaf,
)
tree.fit(Phi, y)
self.estimators_.append(tree)
self.intervals_.append(intervals)
return self
def predict(self, X: np.ndarray) -> np.ndarray:
if self.estimators_ is None or self.intervals_ is None or self.n_timepoints_ is None:
raise ValueError("Model is not fitted")
X3 = _as_3d_panel(X)
if X3.shape[1] != int(self.n_dims_):
raise ValueError("X has different n_dims than seen in fit")
if X3.shape[2] != int(self.n_timepoints_):
raise ValueError("X has different n_timepoints than seen in fit")
preds = []
for tree, intervals in zip(self.estimators_, self.intervals_):
Phi = _transform_intervals(X3, intervals)
preds.append(tree.predict(Phi))
return np.mean(np.vstack(preds), axis=0)
# --- Synthetic daily series (trend + weekly + monthly seasonality + autocorrelated noise) ---
def simulate_ar1_noise(n: int, *, phi: float, sigma: float, rng: np.random.Generator) -> np.ndarray:
eps = rng.normal(0.0, sigma, size=n)
u = np.zeros(n)
for t in range(1, n):
u[t] = phi * u[t - 1] + eps[t]
return u
n = 700
idx = pd.date_range("2022-01-01", periods=n, freq="D")
t = np.arange(n)
weekly = 2.0 * np.sin(2 * np.pi * t / 7) + 0.5 * np.cos(2 * np.pi * t / 7)
monthly = 1.3 * np.sin(2 * np.pi * t / 30) - 0.4 * np.cos(2 * np.pi * t / 30)
trend = 0.02 * t
noise = simulate_ar1_noise(n, phi=0.6, sigma=0.7, rng=rng)
y = 50.0 + trend + weekly + monthly + noise
y = pd.Series(y, index=idx, name="y")
fig = go.Figure()
fig.add_trace(go.Scatter(x=y.index, y=y.values, name="y", line=dict(color="black")))
fig.update_layout(title="Synthetic series", xaxis_title="date", yaxis_title="value", height=420)
fig.show()
def make_sliding_windows(y: np.ndarray, window_length: int) -> tuple[np.ndarray, np.ndarray]:
y = np.asarray(y, dtype=float)
L = int(window_length)
if L < 2:
raise ValueError("window_length must be >= 2")
if y.size <= L:
raise ValueError("y is too short for the chosen window_length")
X = np.column_stack([y[i : y.size - L + i] for i in range(L)])
y_next = y[L:]
return X, y_next
L = 60
X, y_next = make_sliding_windows(y.to_numpy(), window_length=L)
# time-aware split
h = 120
X_train, y_train = X[:-h], y_next[:-h]
X_test, y_test = X[-h:], y_next[-h:]
t_test = y.index[-h:]
X_train.shape, X_test.shape
((520, 60), (120, 60))
tsf = TimeSeriesForestRegressor(
n_estimators=250,
n_intervals="sqrt",
min_interval=5,
max_interval=None,
max_depth=None,
min_samples_leaf=2,
random_state=7,
)
tsf.fit(X_train, y_train)
y_pred = tsf.predict(X_test)
mae = float(mean_absolute_error(y_test, y_pred))
rmse = float(np.sqrt(mean_squared_error(y_test, y_pred)))
r2 = float(r2_score(y_test, y_pred))
print("MAE:", mae)
print("RMSE:", rmse)
print("R^2:", r2)
MAE: 2.0957017770743134
RMSE: 2.589710861221658
R^2: -0.506010971830327
fig = go.Figure()
fig.add_trace(go.Scatter(x=t_test, y=y_test, name="actual", line=dict(color="black")))
fig.add_trace(go.Scatter(x=t_test, y=y_pred, name="pred (1-step)", line=dict(color="#4E79A7")))
fig.update_layout(title="TimeSeriesForestRegressor one-step-ahead predictions", xaxis_title="date", yaxis_title="value")
fig.show()
# Visual intuition: show some random intervals used by one tree over one input window
tree_idx = 0
intervals = tsf.intervals_[tree_idx]
window = X_test[-1]
x_axis = np.arange(window.size)
fig = go.Figure()
fig.add_trace(go.Scatter(x=x_axis, y=window, name="window", line=dict(color="black")))
for (s, e) in intervals[:12]:
fig.add_vrect(x0=s, x1=e - 1, fillcolor="rgba(78,121,167,0.10)", line_width=0)
fig.update_layout(
title=f"Example random intervals (tree {tree_idx}) over one input window",
xaxis_title="lag index in window",
yaxis_title="value",
)
fig.show()